package gu.simplemq.redis;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Strings;

import gu.simplemq.BaseConsumer;
import gu.simplemq.Channel;
import gu.simplemq.ChannelDispatcher;
import gu.simplemq.IConsumer;
import gu.simplemq.IMessageAdapter;
import gu.simplemq.exceptions.SmqTypeException;
import redis.clients.jedis.Jedis;
/**
 * {@link BaseConsumer}消费者模型实现,支持多个list阻塞式读取(blpop, brpop)<br>
 * 执行 {@link #subscribe(String...)}方法时会自动开启消费线程
 * @author guyadong
 *
 */
public class RedisConsumer extends BaseConsumer implements IRedisComponent,IConsumer,RedisConstants {

	private final JedisPoolLazy poolLazy;
	private final ChannelDispatcher register=new ChannelDispatcher(){
		@Override
		protected String check(String name) throws SmqTypeException {
			return RedisComponentType.Queue.check(poolLazy,name);
		}};
	/** 以秒为单位的超时参数 */
	private int timeout;
	/** 当前实例在redis上唯一不重复的ID名字 */
	private final String clientId;
	@Override
	public JedisPoolLazy getPoolLazy() {
		return poolLazy;
	}

	RedisConsumer(JedisPoolLazy poolLazy) {
		super();
		this.poolLazy = poolLazy;
		this.setTimeoutMills(DEFAULT_CONSUMER_CHECK_INTERVAL);
		clientId = "RedisConsumer_" + JedisUtils.incr(poolLazy,RedisConstants.CONSUMER_COUNTER);

	}
	private final Runnable customRunnable = new Runnable(){
		@Override
		public void run() {
			try {
				List<String> list;
				Jedis jedis = poolLazy.apply();
				try{
					String[] keys =register.getSubscribes();
					// 订阅频道为0时关闭线程
					if(0 == keys.length){
						close();
						return;
					}
					if(isFifo){
						list = jedis.blpop(timeout, keys);
					}else{
						list = jedis.brpop(timeout, keys);
					}
				}finally{
					poolLazy.free();
				}
				if(!list.isEmpty()){
					String channel = list.get(0);
					String message = list.get(1);
					register.dispatch(channel, message);
				}
			} catch (Throwable e) {
				logger.error(e.getMessage());
			}
		}
	};
	
	@Override
	protected Runnable getCustomRunnable() {
		return customRunnable;
	}
	private void addConsumer(Collection<String>chSet){
		for(String name:chSet){
			JedisUtils.sadd(name + RedisConstants.CONSUMER_SET_SUFFIX,clientId);
		}
	}
	private void addConsumer(String[] chSet){
		addConsumer(Arrays.asList(chSet));
	}
	private void removeConsumer(Collection<String>chSet){
		for(String name:chSet){
			JedisUtils.srem(name + RedisConstants.CONSUMER_SET_SUFFIX,clientId);
		}
	}
	private void removeConsumer(String []chSet){
		removeConsumer(Arrays.asList(chSet));
	}
	@Override
	public Set<Channel<?>> register(Channel<?>... channels) {
		Set<Channel<?>> chSet = register.register(channels);
		this.open();
		addConsumer(Channel.getChannelNames(chSet));
		return chSet;
	}

	@Override
	public Set<String> unregister(String... channels) {
		Set<String> chSet =  register.unregister(channels);
		removeConsumer(chSet);
		return chSet;
	}
	
	@Override
	public Set<String> unregister(Channel<?>... channels) {
		Set<String> chSet = register.unregister(channels);
		removeConsumer(chSet);
		return chSet;
	}

	@Override
	public Set<String> unregister(final IMessageAdapter<?> messageAdapter) {
		Set<String> chSet = register.unregister(messageAdapter);
		removeConsumer(chSet);
		return chSet;
	}

	@Override
	public String[] subscribe(String... channels) {		
		channels = this.register.subscribe(channels);
		this.open();
		addConsumer(channels);
		return channels;
	}

	@Override
	public String[] unsubscribe(String... channels) {
		String[] chSet = this.register.unsubscribe(channels);
		removeConsumer(chSet);
		return chSet;
	}
	
	@SuppressWarnings("rawtypes")
	@Override
	public Channel getChannel(String channel) {
		return register.getChannel(channel);
	}

	@Override
	public String[] getSubscribes() {
		return register.getSubscribes();
	}

	@Override
	public BaseConsumer setTimeoutMills(int timeoutMills) {
		super.setTimeoutMills(timeoutMills);
		this.timeout = (int) TimeUnit.SECONDS.convert(this.timeoutMills, TimeUnit.MILLISECONDS);
		return this;
	}
	
	/**
	 * 设置超时参数(秒)
	 * @param timeout (seconds)
	 * @see #setTimeoutMills(int)
	 */
	public RedisConsumer setTimeout(int timeout) {
		if(timeout>0){
			this.timeout = timeout;
			super.setTimeoutMills((int) TimeUnit.MILLISECONDS.convert(timeout, TimeUnit.SECONDS));
		}
		return this;
	}

	@Override
	public void close() {
		super.close();
		unsubscribe();
	}
	
	@Override
	public void register( Channel<?>channel, long duration, TimeUnit unit) {
		throw new UnsupportedOperationException();
	}

	/**
	 * 返回指定队列订阅的client数量
	 * @param channel
	 */
	public static int countOf(String channel){
		if(Strings.isNullOrEmpty(channel)){
			return 0;
		}
		return (int) JedisUtils.scard(channel + RedisConstants.CONSUMER_SET_SUFFIX);
	}

	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append("RedisConsumer [uri=").append(poolLazy.getCanonicalURI()).append(", subscribes=")
				.append(Arrays.toString(getSubscribes())).append(", clientId=").append(clientId).append("]");
		return builder.toString();
	}

}
